#!/usr/bin/env python3

import argparse
import os
import string
import sys

CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))


def __format(template, **params):
    field_names = [v[1] for v in string.Formatter().parse(template) if v[1] is not None]
    kv_args = {}
    for field in field_names:
        if field in params:
            kv_args[field] = params[field]
        else:
            kv_args[field] = ""

    return template.format(**kv_args)


def instance_create_statement(
    table_name,
    table_columns,
    table_keys,
    table_engine,
    with_deduplication,
    no_merges=True,
):
    template = """
        CREATE TABLE {table_name}
        {table_columns}
        ENGINE = {table_engine}
        ORDER BY {table_keys}
        {table_settings};
        {table_no_merges}
        """

    params = dict()
    params["table_name"] = table_name
    params["table_columns"] = table_columns
    params["table_keys"] = table_keys
    params["table_no_merges"] = f"SYSTEM STOP MERGES {table_name};" if no_merges else ""
    params["table_engine"] = (
        "MergeTree()"
        if table_engine == "MergeTree"
        else f"ReplicatedMergeTree('/clickhouse/tables/{{database}}/{table_name}', '1')"
    )

    deduplication_window_setting_name = (
        "non_replicated_deduplication_window"
        if table_engine == "MergeTree"
        else "replicated_deduplication_window"
    )
    deduplication_window_setting_value = 1000 if with_deduplication else 0

    settings = list()
    settings += [
        f"{deduplication_window_setting_name}={deduplication_window_setting_value}"
    ]
    params["table_settings"] = "SETTINGS " + ",".join(settings)

    return __format(template, **params)


def instance_insert_statement(
    table_name, count, insert_method, insert_unique_blocks, use_insert_token
):
    insert_settings = (
        "" if not use_insert_token else "SETTINGS insert_deduplication_token='UDT'"
    )

    if insert_method == "InsertSelect":
        template = """
            INSERT INTO {table_name}
                SELECT {insert_columns}
                FROM numbers({count}) {insert_settings};
            """
        return __format(
            template,
            table_name=table_name,
            count=count,
            insert_columns=(
                "'src_4', 4"
                if not insert_unique_blocks
                else "'src_' || toString(number), number"
            ),
            insert_settings=insert_settings,
        )

    else:
        template = """
                    INSERT INTO {table_name}
                        {insert_settings} VALUES {insert_values};
                    """

        values = []
        for i in range(count):
            values += (
                [f"('src_{i}', {i})"] if insert_unique_blocks else ["('src_4', 4)"]
            )
        insert_values = ", ".join(values)

        return __format(
            template,
            table_name=table_name,
            insert_settings=insert_settings,
            insert_values=insert_values,
        )


def get_drop_tables_statements(tables):
    return "".join(
        [f"DROP TABLE IF EXISTS {table_name};\n" for table_name in tables[::-1]]
    )


def get_logs_statement(args):
    if args.get_logs:
        return "SET send_logs_level='test';"
    return ""


def str2bool(v):
    if isinstance(v, bool):
        return v
    if v.lower() in ("yes", "true", "t", "y", "1"):
        return True
    elif v.lower() in ("no", "false", "f", "n", "0"):
        return False
    else:
        raise argparse.ArgumentTypeError("Boolean value expected.")


class ArgsFactory:
    def __init__(self, parser):
        self.__parser = parser

    def add_opt_engine(self):
        self.__parser.add_argument(
            "--table-engine",
            choices=["ReplicatedMergeTree", "MergeTree"],
            default="MergeTree",
        )

    def add_opt_user_token(self):
        self.__parser.add_argument(
            "--use-insert-token", type=str2bool, nargs="?", const=True, default=False
        )

    def add_opt_single_thread(self):
        self.__parser.add_argument(
            "--single-thread", type=str2bool, nargs="?", const=True, default=True
        )

    def add_opt_dedup_src(self):
        self.__parser.add_argument(
            "--deduplicate-src-table",
            type=str2bool,
            nargs="?",
            const=True,
            default=True,
        )

    def add_opt_dedup_dst(self):
        self.__parser.add_argument(
            "--deduplicate-dst-table",
            type=str2bool,
            nargs="?",
            const=True,
            default=True,
        )

    def add_opt_get_logs(self):
        self.__parser.add_argument(
            "--get-logs", type=str2bool, nargs="?", const=True, default=False
        )

    def add_opt_uniq_blocks(self):
        self.__parser.add_argument(
            "--insert-unique-blocks", type=str2bool, nargs="?", const=True, default=True
        )

    def add_opt_insert_method(self):
        self.__parser.add_argument(
            "--insert-method",
            choices=["InsertSelect", "InsertValues"],
            default="InsertSelect",
        )

    def add_all(self):
        self.add_opt_engine()
        self.add_opt_user_token()
        self.add_opt_single_thread()
        self.add_opt_dedup_src()
        self.add_opt_dedup_dst()
        self.add_opt_get_logs()
        self.add_opt_insert_method()
        self.add_opt_uniq_blocks()


def test_insert_several_blocks(parser):
    ArgsFactory(parser).add_all()

    def calle(args):
        create_table_a_b_statement = instance_create_statement(
            table_name="table_a_b",
            table_columns="(a String, b UInt64)",
            table_keys="(a, b)",
            table_engine=args.table_engine,
            with_deduplication=args.deduplicate_src_table,
        )

        create_table_when_b_even_statement = instance_create_statement(
            table_name="table_when_b_even",
            table_columns="(a String, b UInt64)",
            table_keys="(a, b)",
            table_engine=args.table_engine,
            with_deduplication=args.deduplicate_dst_table,
        )

        create_mv_statement = """
            CREATE MATERIALIZED VIEW mv_b_even
            TO table_when_b_even
            AS
                SELECT a, b
                FROM table_a_b
                WHERE b % 2 = 0;
            """

        drop_tables_statements = get_drop_tables_statements(
            ["table_a_b", "table_when_b_even", "mv_b_even"]
        )

        insert_statement = instance_insert_statement(
            "table_a_b",
            10,
            args.insert_method,
            args.insert_unique_blocks,
            args.use_insert_token,
        )

        print_details_statements = f"""
            SELECT 'table_a_b';
            SELECT 'count', count() FROM table_a_b;
            {"" if not args.get_logs else "SELECT _part, count() FROM table_a_b GROUP BY _part ORDER BY _part;"}

            SELECT 'table_when_b_even';
            SELECT 'count', count() FROM table_when_b_even;
            {"" if not args.get_logs else "SELECT _part, count() FROM table_when_b_even GROUP BY _part ORDER BY _part;"}
            """

        if args.insert_unique_blocks:
            assert_first_insert_statements = f"""
                SELECT throwIf( count() != 10 )
                    FROM table_a_b;
                SELECT throwIf( count() != 5 )
                    FROM table_when_b_even;
                """
            assert_second_insert_statements = f"""
               SELECT throwIf( count() != {10 if args.deduplicate_src_table else 20} )
                   FROM table_a_b;
               SELECT throwIf( count() != {5 if args.deduplicate_dst_table else 10} )
                   FROM table_when_b_even;
               """
        else:
            if args.use_insert_token:
                assert_first_insert_statements = """
                                SELECT throwIf( count() != 10 )
                                    FROM table_a_b;
                                SELECT throwIf( count() != 10 )
                                    FROM table_when_b_even;
                                """
                assert_second_insert_statements = f"""
                               SELECT throwIf( count() != {10 if args.deduplicate_src_table else 20} )
                                   FROM table_a_b;
                               SELECT throwIf( count() != {10 if args.deduplicate_dst_table else 20} )
                                   FROM table_when_b_even;
                               """
            else:
                assert_first_insert_statements = f"""
                    SELECT throwIf( count() != {1 if args.deduplicate_src_table else 10} )
                        FROM table_a_b;
                    SELECT throwIf( count() != {1 if args.deduplicate_dst_table else 10} )
                        FROM table_when_b_even;
                    """
                assert_second_insert_statements = f"""
                    SELECT throwIf( count() != {1 if args.deduplicate_src_table else 20} )
                        FROM table_a_b;
                    SELECT throwIf( count() != {1 if args.deduplicate_dst_table else 20} )
                        FROM table_when_b_even;
                    """

        script = f"""
            {get_logs_statement(args)}

            SET max_insert_threads={1 if args.single_thread else 10};
            SET update_insert_deduplication_token_in_dependent_materialized_views=1;
            SET deduplicate_blocks_in_dependent_materialized_views=1;

            SET max_block_size=1;
            SET min_insert_block_size_rows=0;
            SET min_insert_block_size_bytes=0;

            {drop_tables_statements}

            {create_table_a_b_statement}

            {create_table_when_b_even_statement}

            {create_mv_statement}

            -- first insert
            {insert_statement}

            {print_details_statements}

            {assert_first_insert_statements}

            -- second insert, it is retry
            {insert_statement}

            {print_details_statements}

            {assert_second_insert_statements}

            {drop_tables_statements}
            """

        print(script)

    parser.set_defaults(func=calle)


def test_mv_generates_several_blocks(parser):
    ArgsFactory(parser).add_all()

    def calle(args):
        tables = [
            "table_for_join_with",
            "table_a_b",
            "table_when_b_even_and_joined",
            "mv_b_even",
        ]
        drop_tables_statements = get_drop_tables_statements(tables)

        details_print_for_table_for_join_with = ""
        if args.get_logs:
            details_print_for_table_for_join_with = """
                SELECT 'table_for_join_with';
                SELECT a_join, b, _part FROM table_for_join_with ORDER BY _part, a_join, b;
                """

        create_table_a_b_statement = instance_create_statement(
            table_name="table_a_b",
            table_columns="(a_src String, b UInt64)",
            table_keys="(a_src, b)",
            table_engine=args.table_engine,
            with_deduplication=args.deduplicate_src_table,
        )

        create_table_when_b_even_and_joined_statement = instance_create_statement(
            table_name="table_when_b_even_and_joined",
            table_columns="(a_src String, a_join String, b UInt64)",
            table_keys="(a_src, a_join, b)",
            table_engine=args.table_engine,
            with_deduplication=args.deduplicate_dst_table,
        )

        insert_statement = instance_insert_statement(
            "table_a_b",
            5,
            args.insert_method,
            args.insert_unique_blocks,
            args.use_insert_token,
        )

        details_print_statements = f"""
            SELECT 'table_a_b';
            SELECT 'count', count() FROM table_a_b;

            SELECT 'table_when_b_even_and_joined';
            SELECT 'count', count() FROM table_when_b_even_and_joined;
            {"" if not args.get_logs else "SELECT _part, a_src, a_join, b FROM table_when_b_even_and_joined ORDER BY _part;"}
        """

        if args.insert_unique_blocks:
            assert_first_insert_statements = f"""
                SELECT throwIf( count() != 5 )
                    FROM table_a_b;

                SELECT throwIf( count() != 9 )
                    FROM table_when_b_even_and_joined;
                """
            assert_second_insert_statements = f"""
                SELECT throwIf( count() != {5 if args.deduplicate_src_table else 10} )
                    FROM table_a_b;

                SELECT throwIf( count() != {9 if args.deduplicate_dst_table else 18} )
                    FROM table_when_b_even_and_joined;
                """
        else:
            if args.use_insert_token:
                assert_first_insert_statements = f"""
                    SELECT throwIf( count() != {5 if args.deduplicate_src_table else 5} )
                        FROM table_a_b;

                    SELECT throwIf( count() != {10 if args.deduplicate_dst_table else 10} )
                        FROM table_when_b_even_and_joined;
                    """
                assert_second_insert_statements = f"""
                    SELECT throwIf( count() != {5 if args.deduplicate_src_table else 10} )
                        FROM table_a_b;

                    SELECT throwIf( count() != {10 if args.deduplicate_dst_table else 20} )
                        FROM table_when_b_even_and_joined;
                    """
            else:
                assert_first_insert_statements = f"""
                    SELECT throwIf( count() != {1 if args.deduplicate_src_table else 5} )
                       FROM table_a_b;

                    SELECT throwIf( count() != {2 if args.deduplicate_dst_table else 10} )
                       FROM table_when_b_even_and_joined;
                    """
                assert_second_insert_statements = f"""
                    SELECT throwIf( count() != {1 if args.deduplicate_src_table else 10} )
                        FROM table_a_b;

                    SELECT throwIf( count() != {2 if args.deduplicate_dst_table else 20} )
                        FROM table_when_b_even_and_joined;
                    """

        script = f"""
            {get_logs_statement(args)}

            SET max_insert_threads={1 if args.single_thread else 10};
            SET update_insert_deduplication_token_in_dependent_materialized_views=1;
            SET deduplicate_blocks_in_dependent_materialized_views=1;

            SET max_block_size=1;
            SET min_insert_block_size_rows=0;
            SET min_insert_block_size_bytes=0;

            {drop_tables_statements}

            CREATE TABLE table_for_join_with
                (a_join String, b UInt64)
                ENGINE = MergeTree()
                ORDER BY (a_join, b);
            INSERT INTO table_for_join_with
                SELECT 'joined_' || toString(number), number
                FROM numbers(1);
            {details_print_for_table_for_join_with}

            {create_table_a_b_statement}
            SYSTEM STOP MERGES table_a_b;

            {create_table_when_b_even_and_joined_statement}
            SYSTEM STOP MERGES table_when_b_even_and_joined;

            CREATE MATERIALIZED VIEW mv_b_even
                TO table_when_b_even_and_joined
                AS
                    SELECT a_src, a_join, table_for_join_with.b as b
                        FROM table_a_b
                        FULL OUTER JOIN table_for_join_with
                        ON table_a_b.b = table_for_join_with.b AND table_a_b.b % 2 = 0
                        ORDER BY a_src, a_join, b;

            -- first insert
            {insert_statement}

            {details_print_statements}

            -- first assertion
            {assert_first_insert_statements}

            -- second insert
            {insert_statement}

            {details_print_statements}

            -- second assertion
            {assert_second_insert_statements}

            {drop_tables_statements}
        """

        print(script)

    parser.set_defaults(func=calle)


def test_several_mv_into_one_table(parser):
    ArgsFactory(parser).add_all()

    def calle(args):
        tables = ["table_src", "table_dst", "mv_b_even", "mv_b_even_even"]
        drop_tables_statements = get_drop_tables_statements(tables)

        create_table_src_statement = instance_create_statement(
            table_name="table_src",
            table_columns="(a String, b UInt64)",
            table_keys="(a, b)",
            table_engine=args.table_engine,
            with_deduplication=args.deduplicate_src_table,
        )

        create_table_dst_statement = instance_create_statement(
            table_name="table_dst",
            table_columns="(a String, b UInt64)",
            table_keys="(a, b)",
            table_engine=args.table_engine,
            with_deduplication=args.deduplicate_dst_table,
        )

        insert_statement = instance_insert_statement(
            "table_src",
            8,
            args.insert_method,
            args.insert_unique_blocks,
            args.use_insert_token,
        )

        details_print_statements = f"""
            SELECT 'table_src count', count() FROM table_src;

            SELECT 'table_dst count', count() FROM table_dst;
            {"" if not args.get_logs else "SELECT _part, count() FROM table_dst GROUP BY _part ORDER BY _part;"}
        """

        if args.insert_unique_blocks:
            assert_first_insert_statements = f"""
                SELECT throwIf( count() != 8 )
                    FROM table_src;

                SELECT throwIf( count() != 6 )
                    FROM table_dst;
                """
            assert_second_insert_statements = f"""
                SELECT throwIf( count() != {8 if args.deduplicate_src_table else 16} )
                    FROM table_src;

                SELECT throwIf( count() != {6 if args.deduplicate_dst_table else 12} )
                    FROM table_dst;
                """
        else:
            if args.use_insert_token:
                assert_first_insert_statements = f"""
                    SELECT throwIf( count() != {8 if args.deduplicate_src_table else 8} )
                        FROM table_src;

                    SELECT throwIf( count() != {16 if args.deduplicate_dst_table else 16} )
                        FROM table_dst;
                    """
                assert_second_insert_statements = f"""
                    SELECT throwIf( count() != {8 if args.deduplicate_src_table else 16} )
                        FROM table_src;

                    SELECT throwIf( count() != {16 if args.deduplicate_dst_table else 32} )
                        FROM table_dst;
                    """
            else:
                assert_first_insert_statements = f"""
                    SELECT throwIf( count() != {1 if args.deduplicate_src_table else 8} )
                        FROM table_src;

                    SELECT throwIf( count() != {2 if args.deduplicate_dst_table else 16} )
                        FROM table_dst;
                    """
                assert_second_insert_statements = f"""
                    SELECT throwIf( count() != {1 if args.deduplicate_src_table else 16} )
                        FROM table_src;

                    SELECT throwIf( count() != {2 if args.deduplicate_dst_table else 32} )
                        FROM table_dst;
                    """

        script = f"""
            {get_logs_statement(args)}

            SET max_insert_threads={1 if args.single_thread else 10};
            SET update_insert_deduplication_token_in_dependent_materialized_views=1;
            SET deduplicate_blocks_in_dependent_materialized_views=1;

            SET max_block_size=1;
            SET min_insert_block_size_rows=0;
            SET min_insert_block_size_bytes=0;

            {drop_tables_statements}

            {create_table_src_statement}

            {create_table_dst_statement}

            CREATE MATERIALIZED VIEW mv_b_even
                TO table_dst
                AS
                SELECT a, b
                    FROM table_src
                    WHERE b % 2 = 0;

            CREATE MATERIALIZED VIEW mv_b_even_even
                TO table_dst
                AS
                SELECT a, b
                    FROM table_src
                    WHERE b % 4 = 0;

            -- first insert
            {insert_statement}

            {details_print_statements}

            {assert_first_insert_statements}

            -- second insert, retry
            {insert_statement}

            {details_print_statements}

            {assert_second_insert_statements}

            {drop_tables_statements}
            """

        print(script)

    parser.set_defaults(func=calle)


def parse_args():
    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers(dest="test")
    test_insert_several_blocks(
        subparsers.add_parser("insert_several_blocks_into_table")
    )
    test_mv_generates_several_blocks(
        subparsers.add_parser("mv_generates_several_blocks")
    )
    test_several_mv_into_one_table(subparsers.add_parser("several_mv_into_one_table"))
    args = parser.parse_args()
    if args.test is None:
        parser.print_help()
    return args


def main():
    args = parse_args()
    if args.test is not None:
        args.func(args)


if __name__ == "__main__":
    main()
